Coverage for cpprb/PyReplayBuffer.pyx: 96%
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# distutils: language = c++
2# cython: linetrace=True
4import ctypes
5from logging import getLogger, StreamHandler, Formatter, INFO
6import multiprocessing as mp
7import time
8from typing import Any, Dict, Callable, Optional
9import warnings
11cimport numpy as np
12import numpy as np
13import cython
14from cython.operator cimport dereference
16from cpprb.ReplayBuffer cimport *
17from cpprb.multiprocessing import RawArray, RawValue, _has_SharedMemory, try_start
19from .VectorWrapper cimport *
20from .VectorWrapper import (VectorWrapper,
21 VectorInt,VectorSize_t,
22 VectorDouble,PointerDouble,VectorFloat)
25def default_logger(level=INFO):
26 """
27 Create default logger for cpprb
28 """
29 logger = getLogger("cpprb")
30 logger.setLevel(level)
32 if not logger.hasHandlers():
33 handler = StreamHandler()
34 handler.setLevel(level)
36 format = Formatter("%(asctime)s.%(msecs)03d [%(levelname)s] " +
37 "(%(filename)s:%(lineno)s) %(message)s",
38 "%Y%m%d-%H%M%S")
39 handler.setFormatter(format)
40 logger.addHandler(handler)
41 logger.propagate = False
43 return logger
45cdef double [::1] Cdouble(array):
46 return np.ravel(np.array(array,copy=False,dtype=np.double,ndmin=1,order='C'))
48cdef inline const size_t [::1] Csize(array):
49 return np.ravel(np.array(array,copy=False,dtype=np.uint64,ndmin=1,order='C'))
51@cython.embedsignature(True)
52cdef inline const float [::1] Cfloat(array):
53 return np.ravel(np.array(array,copy=False,dtype=np.single,ndmin=1,order='C'))
56def unwrap(d):
57 return d[np.newaxis][0]
60@cython.embedsignature(True)
61cdef class Environment:
62 """
63 Base class to store environment
64 """
65 cdef PointerDouble obs
66 cdef PointerDouble act
67 cdef PointerDouble rew
68 cdef PointerDouble next_obs
69 cdef PointerDouble done
70 cdef size_t buffer_size
71 cdef size_t obs_dim
72 cdef size_t act_dim
73 cdef size_t rew_dim
74 cdef bool is_discrete_action
75 cdef obs_shape
77 def __cinit__(self,size,obs_dim=1,act_dim=1,*,
78 rew_dim=1,is_discrete_action = False,
79 obs_shape = None, **kwargs):
80 self.obs_shape = obs_shape
81 self.is_discrete_action = is_discrete_action
83 cdef size_t _dim
84 if self.obs_shape is None:
85 self.obs_dim = obs_dim
86 else:
87 self.obs_dim = 1
88 for _dim in self.obs_shape:
89 self.obs_dim *= _dim
91 self.act_dim = act_dim if not self.is_discrete_action else 1
92 self.rew_dim = rew_dim
94 self.obs = PointerDouble(ndim=2,value_dim=self.obs_dim,size=size)
95 self.act = PointerDouble(ndim=2,value_dim=self.act_dim,size=size)
96 self.rew = PointerDouble(ndim=2,value_dim=self.rew_dim,size=size)
97 self.next_obs = PointerDouble(ndim=2,value_dim=self.obs_dim,size=size)
98 self.done = PointerDouble(ndim=2,value_dim=1,size=size)
100 def __init__(self,size,obs_dim=1,act_dim=1,*,
101 rew_dim=1,is_discrete_action = False,
102 obs_shape = None, **kwargs):
103 """
104 Parameters
105 ----------
106 size : int
107 buffer size
108 obs_dim : int, optional
109 observation (obs) dimension whose default value is 1
110 act_dim : int, optional
111 action (act) dimension whose default value is 1
112 rew_dim : int, optional
113 reward (rew) dimension whose default value is 1
114 is_discrete_action: bool, optional
115 If True, act_dim is compressed to 1 whose default value is False
116 obs_shape: array-like
117 observation shape. If not None, overwrite obs_dim.
118 """
119 pass
121 cdef size_t _add(self,double [::1] o,double [::1] a,double [::1] r,
122 double [::1] no,double [::1] d):
123 raise NotImplementedError
125 def add(self,obs,act,rew,next_obs,done):
126 """
127 Add environment(s) into replay buffer.
128 Multiple step environments can be added.
130 Parameters
131 ----------
132 obs : array_like or float or int
133 observation(s)
134 act : array_like or float or int
135 action(s)
136 rew : array_like or float or int
137 reward(s)
138 next_obs : array_like or float or int
139 next observation(s)
140 done : array_like or float or int
141 done(s)
143 Returns
144 -------
145 int
146 the stored first index
147 """
148 return self._add(Cdouble(obs),Cdouble(act),Cdouble(rew),Cdouble(next_obs),Cdouble(done))
150 def _encode_sample(self,idx):
151 dtype = np.int if self.is_discrete_action else np.double
153 _o = self.obs.as_numpy()[idx]
154 _no = self.next_obs.as_numpy()[idx]
155 if self.obs_shape is not None:
156 _shape = (-1,*self.obs_shape)
157 _o = _o.reshape(_shape)
158 _no = _no.reshape(_shape)
160 return {'obs': _o,
161 'act': self.act.as_numpy(dtype=dtype)[idx],
162 'rew': self.rew.as_numpy()[idx],
163 'next_obs': _no,
164 'done': self.done.as_numpy()[idx]}
166 cpdef size_t get_buffer_size(self):
167 """
168 Get buffer size
170 Parameters
171 ----------
173 Returns
174 -------
175 size_t
176 buffer size
177 """
178 return self.buffer_size
180 cdef void _update_size(self,size_t new_size):
181 """ Update environment size
183 Parameters
184 ----------
185 new_size : size_t
186 new size to set as environment (obs,act,rew,next_obs,done)
188 Returns
189 -------
190 """
191 self.obs.update_vec_size(new_size)
192 self.act.update_vec_size(new_size)
193 self.rew.update_vec_size(new_size)
194 self.next_obs.update_vec_size(new_size)
195 self.done.update_vec_size(new_size)
197 cpdef size_t get_obs_dim(self):
198 """Return observation dimension (obs_dim)
199 """
200 return self.obs_dim
202 def get_obs_shape(self):
203 """Return observation shape
204 """
205 return self.obs_shape
207 cpdef size_t get_act_dim(self):
208 """Return action dimension (act_dim)
209 """
210 return self.act_dim
212 cpdef size_t get_rew_dim(self):
213 """Return reward dimension (rew_dim)
214 """
215 return self.rew_dim
218@cython.embedsignature(True)
219cdef class SelectiveEnvironment(Environment):
220 """
221 Base class for episode level management envirionment
222 """
223 cdef CppSelectiveEnvironment[double,double,double,double] *buffer
224 def __cinit__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs):
225 self.buffer_size = episode_len * Nepisodes
227 self.buffer = new CppSelectiveEnvironment[double,double,
228 double,double](episode_len,
229 Nepisodes,
230 self.obs_dim,
231 self.act_dim,
232 self.rew_dim)
234 self.buffer.get_buffer_pointers(self.obs.ptr,
235 self.act.ptr,
236 self.rew.ptr,
237 self.next_obs.ptr,
238 self.done.ptr)
240 def __init__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs):
241 """
242 Parameters
243 ----------
244 episode_len : int
245 the mex size of environments in a single episode
246 obs_dim : int
247 observation (obs, next_obs) dimension
248 act_dim : int
249 action (act) dimension
250 Nepisodes : int, optional
251 the max size of stored episodes
252 rew_dim : int, optional
253 reward (rew) dimension
254 """
255 pass
257 cdef size_t _add(self,double [::1] obs,double [::1] act, double [::1] rew,
258 double [::1] next_obs, double [::1] done):
259 return self.buffer.store(&obs[0],&act[0],&rew[0],
260 &next_obs[0],&done[0],done.shape[0])
262 cpdef void clear(self) except *:
263 """
264 Clear replay buffer.
266 Parameters
267 ----------
269 Returns
270 -------
271 """
272 clear(self.buffer)
274 cpdef size_t get_stored_size(self):
275 """
276 Get stored size
278 Parameters
279 ----------
281 Returns
282 -------
283 size_t
284 stored size
285 """
286 return get_stored_size(self.buffer)
288 cpdef size_t get_next_index(self):
289 """
290 Get the next index to store
292 Parameters
293 ----------
295 Returns
296 -------
297 size_t
298 the next index to store
299 """
300 return get_next_index(self.buffer)
302 cpdef size_t get_stored_episode_size(self):
303 """
304 Get the size of stored episodes
306 Parameters
307 ----------
309 Returns
310 -------
311 size_t
312 the size of stored episodes
313 """
314 return self.buffer.get_stored_episode_size()
316 cpdef size_t delete_episode(self,i):
317 """
318 Delete specified episode
320 The stored environment after specified episode are moved to backward.
322 Parameters
323 ----------
324 i : int
325 the index of delete episode
327 Returns
328 -------
329 size_t
330 the size of environments in the deleted episodes
331 """
332 return self.buffer.delete_episode(i)
334 def get_episode(self,i):
335 """
336 Get specified episode
338 Parameters
339 ----------
340 i : int
341 the index of extracted episode
343 Returns
344 -------
345 dict of ndarray
346 the set environment in i-th episode
347 """
348 cdef size_t len = 0
349 self.buffer.get_episode(i,len,
350 self.obs.ptr,self.act.ptr,self.rew.ptr,
351 self.next_obs.ptr,self.done.ptr)
352 if len == 0:
353 return {'obs': np.ndarray((0,self.obs_dim)),
354 'act': np.ndarray((0,self.act_dim)),
355 'rew': np.ndarray((0,self.rew_dim)),
356 'next_obs': np.ndarray((0,self.obs_dim)),
357 'done': np.ndarray(0)}
359 self._update_size(len)
360 return {'obs': self.obs.as_numpy(),
361 'act': self.act.as_numpy(),
362 'rew': self.rew.as_numpy(),
363 'next_obs': self.next_obs.as_numpy(),
364 'done': self.done.as_numpy()}
366 def _encode_sample(self,indexes):
367 self.buffer.get_buffer_pointers(self.obs.ptr,
368 self.act.ptr,
369 self.rew.ptr,
370 self.next_obs.ptr,
371 self.done.ptr)
372 cdef size_t buffer_size = self.get_buffer_size()
373 self._update_size(buffer_size)
374 return super()._encode_sample(indexes)
376@cython.embedsignature(True)
377cdef class SelectiveReplayBuffer(SelectiveEnvironment):
378 """
379 Replay buffer to store episodes of environment.
381 This class can get and delete a episode.
382 """
383 def __cinit__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs):
384 pass
386 def __init__(self,episode_len,obs_dim=1,act_dim=1,*,Nepisodes=10,rew_dim=1,**kwargs):
387 """
388 Parameters
389 ----------
390 episode_len : int
391 the max size of a single episode
392 obs_dim : int
393 observation (obs, next_obs) dimension
394 act_dim : int
395 action (act) dimension
396 Nepisodes : int, optional
397 the max size of stored episodes whose default value is 10
398 rew_dim : int, optional
399 reward (rew) dimension whose dimension is 1
400 """
401 pass
403 def sample(self,batch_size):
404 """
405 Sample the stored environment randomly with specified size
407 Parameters
408 ----------
409 batch_size : int
410 sampled batch size
412 Returns
413 -------
414 dict of ndarray
415 Sampled batch transitions, which might contains the same transition
416 multiple times.
417 """
418 cdef idx = np.random.randint(0,self.get_stored_size(),batch_size)
419 return self._encode_sample(idx)
422cdef class SharedBuffer:
423 cdef dtype
424 cdef data
425 cdef data_ndarray
426 cdef view
427 cdef backend
428 def __init__(self, shape, dtype, data=None, ctx=None, backend="sharedctypes"):
429 self.dtype = np.dtype(dtype)
430 ctx = ctx or mp.get_context()
431 self.backend = backend
433 if data is None:
434 try:
435 ctype = np.ctypeslib.as_ctypes_type(self.dtype)
436 except NotImplementedError:
437 # Dirty hack to allocate correct size shared memory
438 for d in (np.int8, np.int16, np.int32, np.int64):
439 _d = np.dtype(d)
441 if self.dtype.itemsize == _d.itemsize:
442 ctype = np.ctypeslib.as_ctypes_type(_d)
443 break
444 else:
445 raise
447 len = int(np.array(shape,copy=False,dtype="int").prod())
448 self.data = RawArray(ctx,ctype,len,self.backend)
449 else:
450 self.data = data
452 self.data_ndarray = self.data.ndarray
453 self.data_ndarray.shape = shape
455 # Reinterpretation
456 if self.dtype != self.data_ndarray.dtype:
457 self.view = self.data_ndarray.view(self.dtype)
458 else:
459 self.view = self.data_ndarray
462 def __getitem__(self,key):
463 return self.view[key]
465 def __setitem__(self,key,value):
466 self.view[key] = value
468 def __reduce__(self):
469 return (SharedBuffer,(self.view.shape,self.dtype,self.data,None,self.backend))
472def dict2buffer(buffer_size: int,env_dict: Dict,*,
473 stack_compress = None, default_dtype = None,
474 mmap_prefix: Optional[str] = None,
475 shared: Optional[str] = None,
476 ctx = None):
477 """Create buffer from env_dict
479 Parameters
480 ----------
481 buffer_size : int
482 buffer size
483 env_dict : dict of dict
484 Specify environment values to be stored in buffer.
485 stack_compress : str or array like of str, optional
486 compress memory of specified stacked values.
487 default_dtype : numpy.dtype, optional
488 fallback dtype for not specified in `env_dict`. default is numpy.single
489 mmap_prefix : str, optional
490 File name prefix to save buffer data using mmap. If `None` (default),
491 save only on memory.
493 Returns
494 -------
495 buffer : dict of numpy.ndarray
496 buffer for environment specified by env_dict.
497 """
498 cdef buffer = {}
499 cdef bool compress_any = (stack_compress is not None)
500 default_dtype = default_dtype or np.single
502 def zeros(name,shape,dtype):
503 if shared:
504 return SharedBuffer(shape,dtype, ctx=ctx, backend=shared)
506 if mmap_prefix:
507 if not isinstance(shape,tuple):
508 shape = tuple(shape)
509 return np.memmap(f"{mmap_prefix}_{name}.dat",
510 shape=shape,dtype=dtype,mode="w+")
511 else:
512 return np.zeros(shape=shape,dtype=dtype)
514 for name, defs in env_dict.items():
515 shape = np.insert(np.asarray(defs.get("shape",1)),0,buffer_size)
517 if compress_any and np.isin(name,
518 stack_compress,
519 assume_unique=True).any():
520 buffer_shape = np.insert(np.delete(shape,-1),1,shape[-1])
521 buffer_shape[0] += buffer_shape[1] - 1
522 buffer_shape[1] = 1
523 memory = zeros(name, buffer_shape,
524 dtype=defs.get("dtype",default_dtype))
525 strides = np.append(np.delete(memory.strides,1),memory.strides[1])
526 buffer[name] = np.lib.stride_tricks.as_strided(memory,
527 shape=shape,
528 strides=strides)
529 else:
530 buffer[name] = zeros(name,shape,dtype=defs.get("dtype",default_dtype))
532 buffer[name][:] = 1
534 shape[0] = -1
535 defs["add_shape"] = shape
537 return buffer
539def find_array(dict,key):
540 """Find 'key' and ensure numpy.ndarray with the minimum dimension of 1.
542 Parameters
543 ----------
544 dict : dict
545 dict where find 'key'
546 key : str
547 dictionary key to find
549 Returns
550 -------
551 : numpy.ndarray or None
552 If `dict` has `key`, returns the values with numpy.ndarray with the minimum
553 dimension of 1. Otherwise, returns `None`.
554 """
555 return None if not key in dict else np.array(dict[key],ndmin=1,copy=False)
557@cython.embedsignature(True)
558cdef class StepChecker:
559 """Check the step size of addition
560 """
561 cdef check_str
562 cdef check_shape
564 def __init__(self,env_dict,special_keys = None):
565 """Initialize StepChecker class.
567 Parameters
568 ----------
569 env_dict : dict
570 Specify the environment values.
571 """
572 special_keys = special_keys or []
573 for name, defs in env_dict.items():
574 if name in special_keys:
575 continue
576 self.check_str = name
577 self.check_shape = defs["add_shape"]
579 cdef size_t step_size(self,kwargs) except *:
580 """Return step size.
582 Parameters
583 ----------
584 kwargs: dict
585 Added values.
586 """
587 return np.reshape(np.array(kwargs[self.check_str], copy=False),
588 self.check_shape,order='A').shape[0]
590@cython.embedsignature(True)
591cdef class NstepBuffer:
592 """Local buffer class for Nstep reward.
594 This buffer temporary stores environment values and returns Nstep-modified
595 environment values for `ReplayBuffer`
596 """
597 cdef buffer
598 cdef size_t buffer_size
599 cdef default_dtype
600 cdef size_t stored_size
601 cdef size_t Nstep_size
602 cdef float Nstep_gamma
603 cdef Nstep_rew
604 cdef Nstep_next
605 cdef env_dict
606 cdef stack_compress
607 cdef StepChecker size_check
609 def __cinit__(self,env_dict=None,Nstep=None,*,
610 stack_compress = None,default_dtype = None,next_of = None):
611 self.env_dict = env_dict.copy() if env_dict else {}
612 self.stored_size = 0
613 self.stack_compress = None # stack_compress is not support yet.
614 self.default_dtype = default_dtype or np.single
616 if next_of is not None: # next_of is not support yet.
617 for name in np.array(next_of,copy=False,ndmin=1):
618 self.env_dict[f"next_{name}"] = self.env_dict[name]
620 self.Nstep_size = Nstep["size"]
621 self.Nstep_gamma = Nstep.get("gamma",0.99)
622 self.Nstep_rew = find_array(Nstep,"rew")
623 self.Nstep_next = find_array(Nstep,"next")
625 self.buffer_size = self.Nstep_size - 1
626 self.buffer = dict2buffer(self.buffer_size,self.env_dict,
627 stack_compress = self.stack_compress,
628 default_dtype = self.default_dtype)
629 self.size_check = StepChecker(self.env_dict)
631 def __init__(self,env_dict=None,Nstep=None,*,
632 stack_compress = None,default_dtype = None, next_of = None):
633 r"""Initialize NstepBuffer class.
635 Parameters
636 ----------
637 env_dict : dict
638 Specify environment values to be stored.
639 Nstep : dict
640 `Nstep["size"]` is `int` specifying step size of Nstep reward.
641 `Nstep["rew"]` is `str` or array like of `str` specifying
642 Nstep reward to be summed. `Nstep["gamma"]` is float specifying
643 discount factor, its default is 0.99. `Nstep["next"]` is `str` or
644 list of `str` specifying next values to be moved.
645 stack_compress : str or array like of str, optional
646 compress memory of specified stacked values.
647 default_dtype : numpy.dtype, optional
648 fallback dtype for not specified in `env_dict`. default is numpy.single
649 next_of : str or array like of str, optional
650 next item of specified environemt variables (eg. next_obs for next) are
651 also sampled without duplicated values
653 Notes
654 -----
655 Currently, memory compression features (`stack_compress` and `next_of`) are
656 not supported yet. (Fall back to usual storing)
657 """
658 pass
660 def add(self,*,**kwargs):
661 r"""Add envronment into local buffer.
663 Parameters
664 ----------
665 **kwargs : keyword arguments
666 Values to be added.
668 Returns
669 -------
670 env : dict or None
671 Values with Nstep reward calculated. When the local buffer does not
672 store enough cache items, returns 'None'.
673 """
674 cdef size_t N = self.size_check.step_size(kwargs)
675 cdef ssize_t end = self.stored_size + N
677 cdef ssize_t i
678 cdef ssize_t stored_begin
679 cdef ssize_t stored_end
680 cdef ssize_t ext_begin
681 cdef ssize_t max_slide
683 # Case 1
684 # If Nstep buffer don't become full, store all the input transitions.
685 # These transitions are partially calculated.
686 if end <= self.buffer_size:
687 for name, stored_b in self.buffer.items():
688 if self.Nstep_rew is not None and np.isin(name,self.Nstep_rew).any():
689 # Calculate later.
690 pass
691 elif (self.Nstep_next is not None
692 and np.isin(name,self.Nstep_next).any()):
693 # Do nothing.
694 pass
695 else:
696 stored_b[self.stored_size:end] = self._extract(kwargs,name)
698 # Nstep reward must be calculated after "done" filling
699 gamma = (1.0 - self.buffer["done"][:end]) * self.Nstep_gamma
701 if self.Nstep_rew is not None:
702 max_slide = min(self.Nstep_size - self.stored_size,N)
703 max_slide *= -1
704 for name in self.Nstep_rew:
705 ext_b = self._extract(kwargs,name).copy()
706 self.buffer[name][self.stored_size:end] = ext_b
708 for i in range(self.stored_size-1,max_slide,-1):
709 stored_begin = max(i,0)
710 stored_end = i+N
711 ext_begin = max(-i,0)
712 ext_b[ext_begin:] *= gamma[stored_begin:stored_end]
713 self.buffer[name][stored_begin:stored_end] +=ext_b[ext_begin:]
715 self.stored_size = end
716 return None
718 # Case 2
719 # If we have enough transitions, return calculated transtions
720 cdef size_t diff_N = self.buffer_size - self.stored_size
721 cdef size_t add_N = N - diff_N
722 cdef bool NisBigger = (add_N > self.buffer_size)
723 end = self.buffer_size if NisBigger else add_N
725 # Nstep reward must be calculated before "done" filling
726 cdef ssize_t spilled_N
727 gamma = np.ones((self.stored_size + N,1),dtype=np.single)
728 gamma[:self.stored_size] -= self.buffer["done"][:self.stored_size]
729 gamma[self.stored_size:] -= self._extract(kwargs,"done")
730 gamma *= self.Nstep_gamma
731 if self.Nstep_rew is not None:
732 max_slide = min(self.Nstep_size - self.stored_size,N)
733 max_slide *= -1
734 for name in self.Nstep_rew:
735 stored_b = self.buffer[name]
736 ext_b = self._extract(kwargs,name)
738 copy_ext = ext_b.copy()
739 if diff_N:
740 stored_b[self.stored_size:] = ext_b[:diff_N]
741 ext_b = ext_b[diff_N:]
743 for i in range(self.stored_size-1,max_slide,-1):
744 stored_begin = max(i,0)
745 stored_end = i+N
746 ext_begin = max(-i,0)
747 copy_ext[ext_begin:] *= gamma[stored_begin:stored_end]
748 if stored_end <= self.buffer_size:
749 stored_b[stored_begin:stored_end] += copy_ext[ext_begin:]
750 else:
751 spilled_N = stored_end - self.buffer_size
752 stored_b[stored_begin:] += copy_ext[ext_begin:-spilled_N]
753 ext_b[:spilled_N] += copy_ext[-spilled_N:]
755 self._roll(stored_b,ext_b,end,NisBigger,kwargs,name,add_N)
757 for name, stored_b in self.buffer.items():
758 if self.Nstep_rew is not None and np.isin(name,self.Nstep_rew).any():
759 # Calculated.
760 pass
761 elif (self.Nstep_next is not None
762 and np.isin(name,self.Nstep_next).any()):
763 kwargs[name] = self._extract(kwargs,name)[diff_N:]
764 else:
765 ext_b = self._extract(kwargs,name)
767 if diff_N:
768 stored_b[self.stored_size:] = ext_b[:diff_N]
769 ext_b = ext_b[diff_N:]
771 self._roll(stored_b,ext_b,end,NisBigger,kwargs,name,add_N)
773 done = kwargs["done"]
775 for i in range(1,self.buffer_size):
776 if i <= add_N:
777 done[:-i] += kwargs["done"][i:]
778 done[-i:] += self.buffer["done"][:i]
779 else:
780 done += self.buffer["done"][i-add_N:i]
782 self.stored_size = self.buffer_size
783 return kwargs
785 cdef _extract(self,kwargs,name):
786 _dict = self.env_dict[name]
787 return np.reshape(np.array(kwargs[name],copy=False,ndmin=2,
788 dtype=_dict.get("dtype",self.default_dtype)),
789 _dict["add_shape"])
791 cdef void _roll(self,stored_b,ext_b,
792 ssize_t end,bool NisBigger,kwargs,name,size_t add_N):
793 # Swap numpy.ndarray
794 copy_ext = ext_b.copy() # ext_b might be unwriteable, so that copy it.
795 copy_ext[-end:] = stored_b[:end]
796 stored_b[:end] = ext_b[-end:]
798 if NisBigger:
799 # buffer: XXXX, add: YYYYY
800 # buffer: YYYY, add: YXXXX
801 copy_ext = np.roll(copy_ext,end,axis=0)
802 # buffer: YYYY, add: XXXXY
803 else:
804 # buffer: XXXZZZZ, add: YYY
805 # buffer: YYYZZZZ, add: XXX
806 stored_b[:] = np.roll(stored_b,-end,axis=0)[:]
807 # buffer: ZZZZYYY, add: XXX
808 kwargs[name] = copy_ext[:add_N]
810 cpdef void clear(self):
811 """Clear the bufer.
812 """
813 self.stored_size = 0
815 cpdef on_episode_end(self):
816 """Terminate episode.
817 """
818 kwargs = {k: v[:self.stored_size].copy() for k, v in self.buffer.items()}
819 done = kwargs["done"]
821 for i in range(1,self.stored_size):
822 done[:-i] += kwargs["done"][i:]
824 self.clear()
825 return kwargs
827 cpdef size_t get_Nstep_size(self):
828 """Get Nstep size
830 Returns
831 -------
832 Nstep_size : size_t
833 Nstep size
834 """
835 return self.Nstep_size
838cdef class RingBufferIndex:
839 """Ring Buffer Index class
840 """
841 cdef index
842 cdef buffer_size
843 cdef is_full
845 def __init__(self, buffer_size, ctx = None, backend = "sharedctypes"):
846 ctx = ctx or mp.get_context()
847 self.index = RawValue(ctx, ctypes.c_size_t, 0, backend)
848 self.buffer_size = RawValue(ctx, ctypes.c_size_t, buffer_size, backend)
849 self.is_full = RawValue(ctx, ctypes.c_int, 0, backend)
851 cdef size_t get_next_index(self):
852 return self.index.value
854 cdef size_t fetch_add(self,size_t N):
855 """
856 Add then return original value
858 Parameters
859 ----------
860 N : size_t
861 value to add
863 Returns
864 -------
865 size_t
866 index before add
867 """
868 cdef size_t ret = self.index.value
869 self.index.value += N
871 if self.index.value >= self.buffer_size.value:
872 self.is_full.value = 1
874 while self.index.value >= self.buffer_size.value:
875 self.index.value -= self.buffer_size.value
877 return ret
879 cdef void clear(self):
880 self.index.value = 0
881 self.is_full.value = 0
883 cdef size_t get_stored_size(self):
884 if self.is_full.value:
885 return self.buffer_size.value
886 else:
887 return self.index.value
890cdef class ProcessSafeRingBufferIndex(RingBufferIndex):
891 """Process Safe Ring Buffer Index class
892 """
893 cdef lock
895 def __init__(self, buffer_size, ctx=None, backend="sharedctypes"):
896 ctx = ctx or mp.get_context()
897 super().__init__(buffer_size, ctx, backend)
898 self.lock = ctx.Lock()
900 cdef size_t get_next_index(self):
901 with self.lock:
902 return RingBufferIndex.get_next_index(self)
904 cdef size_t fetch_add(self,size_t N):
905 with self.lock:
906 return RingBufferIndex.fetch_add(self,N)
908 cdef void clear(self):
909 with self.lock:
910 RingBufferIndex.clear(self)
912 cdef size_t get_stored_size(self):
913 with self.lock:
914 return RingBufferIndex.get_stored_size(self)
917@cython.embedsignature(True)
918cdef class ReplayBuffer:
919 r"""Replay Buffer class to store transitions and to sample them randomly.
921 The transition can contain anything compatible with NumPy data
922 type. User can specify by ``env_dict`` parameters at constructor
923 freely.
925 The possible standard transition contains observation (``obs``),
926 action (``act``), reward (``rew``), the next observation
927 (``next_obs``), and done (``done``).
929 >>> env_dict = {"obs": {"shape": (4,4)},
930 ... "act": {"shape": 3, "dtype": np.int16},
931 ... "rew": {},
932 ... "next_obs": {"shape": (4,4)},
933 ... "done": {}}
935 In this class, sampling is random sampling and the same transition
936 can be chosen multiple times."""
937 cdef buffer
938 cdef size_t buffer_size
939 cdef env_dict
940 cdef RingBufferIndex index
941 cdef size_t episode_len
942 cdef next_of
943 cdef bool has_next_of
944 cdef next_
945 cdef bool compress_any
946 cdef stack_compress
947 cdef cache
948 cdef default_dtype
949 cdef StepChecker size_check
950 cdef NstepBuffer nstep
951 cdef bool use_nstep
952 cdef size_t cache_size
954 def __cinit__(self,size,env_dict=None,*,
955 next_of=None,stack_compress=None,default_dtype=None,Nstep=None,
956 mmap_prefix =None,
957 **kwargs):
958 self.env_dict = env_dict.copy() if env_dict else {}
959 cdef special_keys = []
961 self.buffer_size = size
962 self.index = RingBufferIndex(self.buffer_size)
963 self.episode_len = 0
965 self.compress_any = stack_compress
966 self.stack_compress = np.array(stack_compress,ndmin=1,copy=False)
968 self.default_dtype = default_dtype or np.single
970 self.has_next_of = next_of
971 self.next_of = np.array(next_of,
972 ndmin=1,copy=False) if self.has_next_of else None
973 self.next_ = {}
974 self.cache = {} if (self.has_next_of or self.compress_any) else None
976 self.use_nstep = Nstep
977 if self.use_nstep:
978 self.nstep = NstepBuffer(self.env_dict,Nstep.copy(),
979 stack_compress = self.stack_compress,
980 next_of = self.next_of,
981 default_dtype = self.default_dtype)
983 # Nstep is not support next_of yet
984 self.next_of = None
985 self.has_next_of = False
987 # side effect: Add "add_shape" key into self.env_dict
988 self.buffer = dict2buffer(self.buffer_size,self.env_dict,
989 stack_compress = self.stack_compress,
990 default_dtype = self.default_dtype,
991 mmap_prefix = mmap_prefix)
993 self.size_check = StepChecker(self.env_dict,special_keys)
995 # Cache Size:
996 # No "next_of" nor "stack_compress": -> 0
997 # If "stack_compress": -> max of stack size -1
998 # If "next_of": -> Increase by 1
999 self.cache_size = 1 if (self.cache is not None) else 0
1000 if self.compress_any:
1001 for name in self.stack_compress:
1002 self.cache_size = max(self.cache_size,
1003 np.array(self.env_dict[name]["shape"],
1004 ndmin=1,copy=False)[-1] -1)
1006 if self.has_next_of:
1007 self.cache_size += 1
1008 for name in self.next_of:
1009 self.next_[name] = self.buffer[name][0].copy()
1011 def __init__(self,size,env_dict=None,*,
1012 next_of=None,stack_compress=None,default_dtype=None,Nstep=None,
1013 mmap_prefix =None,
1014 **kwargs):
1015 r"""Initialize ``ReplayBuffer``
1017 Parameters
1018 ----------
1019 size : int
1020 Buffer size
1021 env_dict : dict of dict, optional
1022 Dictionary specifying environments. The keys of ``env_dict`` become
1023 environment names. The values of ``env_dict``, which are also ``dict``,
1024 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to
1025 ``default_dtype``)
1026 next_of : str or array like of str, optional
1027 Value names whose next items share memory region.
1028 The ``"next_"`` prefixed items (eg. ``next_obs`` for ``obs``) are
1029 automatically added to ``env_dict`` without duplicated memory.
1030 stack_compress : str or array like of str, optional
1031 Value names whose duplicated stack dimension is compressed.
1032 The values must have stacked dimension at the last dimension.
1033 default_dtype : numpy.dtype, optional
1034 Fallback dtype. The default value is ``numpy.single``
1035 Nstep : dict, optional
1036 If this option is specified, Nstep reward is used.
1037 ``Nstep["size"]`` is ``int`` specifying step size of Nstep reward.
1038 ``Nstep["rew"]`` is ``str`` or array like of ``str`` specifying
1039 Nstep reward to be summed. ``Nstep["gamma"]`` is float specifying
1040 discount factor, its default is ``0.99``. ``Nstep["next"]`` is ``str`` or
1041 list of ``str`` specifying next values to be moved.
1042 When this option is enabled, ``"done"`` is required at ``env_dict``.
1043 mmap_prefix : str, optional
1044 File name prefix to map buffer data using mmap. If ``None`` (default),
1045 stores only on memory. This feature is designed for very large data
1046 which cannot be located on physical memory.
1049 Examples
1050 --------
1051 Create simple replay buffer with buffer size of :math:`10^6`.
1053 >>> rb = ReplayBuffer(1e+6,
1054 ... {"obs": {"shape": 3}, "act": {}, "rew": {},
1055 ... "next_obs": {"shape": 3}, "done": {}})
1057 Create replay buffer with ``np.float64``, but only ``"act"`` is ``np.int8``.
1059 >>> rb = ReplayBuffer(1e+6,
1060 ... {"obs": {"shape": 3}, "act": {"dtype": np.int8},
1061 ... "rew": {},
1062 ... "next_obs": {"shape": 3}, "done": {}},
1063 ... default_dtype = np.float64)
1065 Create replay buffer with ``next_of`` memory compression for ``"obs"``.
1066 In this example, ``"next_obs"`` is automatically added and shares the memory
1067 with ``"obs"``.
1069 >>> rb = ReplayBuffer(1e+6,
1070 ... {"obs": {"shape": 3}, "act": {}, "rew": {}, "done": {}},
1071 ... next_of="obs")
1073 Create replay buffer with ``stack_compress`` memory compression for ``"obs"``.
1074 The stacked data must be a sliding window of a sequential data, and the last
1075 dimension is the stack dimension.
1077 >>> rb = ReplayBuffer(1e+6,
1078 ... {"obs": {"shape": (3,2)}},
1079 ... stack_compress="obs")
1080 >>> rb.add(obs=[[1,2],
1081 ... [1,2],
1082 ... [1,2]])
1083 0
1084 >>> rb.add(obs=[[2,3],
1085 ... [2,3],
1086 ... [2,3]])
1087 1
1089 Create very large replay buffer mapping on disk.
1091 >>> rb = ReplayBuffer(1e+9, {"obs": "shape": 3}, mmap_prefix="rb_data")
1092 """
1093 pass
1095 def add(self,*,**kwargs):
1096 r"""Add transition(s) into replay buffer.
1098 Multple sets of transitions can be added simultaneously.
1100 Parameters
1101 ----------
1102 **kwargs : array like or float or int
1103 Transitions to be stored.
1105 Returns
1106 -------
1107 int or None
1108 The first index of stored position. If all transitions are stored
1109 into ``NstepBuffer`` and no transtions are stored into the main buffer,
1110 ``None`` is returned.
1112 Raises
1113 ------
1114 KeyError
1115 If any values defined at constructor are missing.
1117 Warnings
1118 --------
1119 All values must be passed by key-value style (keyword arguments).
1120 It is user responsibility that all the values have the same step-size.
1123 Examples
1124 --------
1125 >>> rb = ReplayBuffer(1e+6, {"obs": {"shape": 3}})
1127 Add a single transition: ``[1,2,3]``.
1129 >>> rb.add(obs=[1,2,3])
1131 Add three step sequential transitions: ``[1,2,3]``, ``[4,5,6]``,
1132 and ``[7,8,9]`` simultaneously.
1134 >>> rb.add(obs=[[1,2,3],
1135 ... [4,5,6],
1136 ... [7,8,9]])
1137 """
1138 if self.use_nstep:
1139 kwargs = self.nstep.add(**kwargs)
1140 if kwargs is None:
1141 return
1143 cdef size_t N = self.size_check.step_size(kwargs)
1145 cdef size_t index = self.index.fetch_add(N)
1146 cdef size_t end = index + N
1147 cdef size_t remain = 0
1148 cdef add_idx = np.arange(index,end)
1149 cdef size_t key_min = 0
1151 if end > self.buffer_size:
1152 remain = end - self.buffer_size
1153 add_idx[add_idx >= self.buffer_size] -= self.buffer_size
1155 if self.cache is not None:
1156 for _i in add_idx:
1157 self.cache.pop(_i, None)
1159 if self.compress_any and (remain or
1160 self.get_stored_size() == self.buffer_size):
1161 key_min = remain or end
1162 for key in range(key_min,
1163 min(key_min + self.cache_size, self.buffer_size)):
1164 self.add_cache_i(key, index)
1166 for name, b in self.buffer.items():
1167 b[add_idx] = np.reshape(np.array(kwargs[name],copy=False,ndmin=2),
1168 self.env_dict[name]["add_shape"])
1170 if self.has_next_of:
1171 for name in self.next_of:
1172 self.next_[name][...]=np.reshape(np.array(kwargs[f"next_{name}"],
1173 copy=False,
1174 ndmin=2),
1175 self.env_dict[name]["add_shape"])[-1]
1177 self.episode_len += N
1178 return index
1180 def get_all_transitions(self,shuffle: bool=False):
1181 r"""
1182 Get all transitions stored in replay buffer.
1184 Parameters
1185 ----------
1186 shuffle : bool, optional
1187 When ``True``, transitions are shuffled. The default value is ``False``.
1189 Returns
1190 -------
1191 transitions : dict of numpy.ndarray
1192 All transitions stored in this replay buffer.
1193 """
1194 idx = np.arange(self.get_stored_size())
1196 if shuffle:
1197 np.random.shuffle(idx)
1199 return self._encode_sample(idx)
1201 def save_transitions(self, file, *, safe=True):
1202 r"""
1203 Save transitions to file
1205 Parameters
1206 ----------
1207 file : str or file-like object
1208 File to write data
1209 safe : bool, optional
1210 If ``False``, we try more aggressive compression
1211 which might encounter future incompatibility.
1212 """
1213 FORMAT_VERSION = 1
1214 if (safe or not (self.compress_any or self.has_next_of)):
1215 data = {"safe": True,
1216 "version": FORMAT_VERSION,
1217 "data": self.get_all_transitions(),
1218 "Nstep": self.is_Nstep(),
1219 "cache": None,
1220 "next_of": None}
1221 else:
1222 self.add_cache()
1223 N = self.get_stored_size()
1224 if N == self.get_buffer_size():
1225 b = self.buffer
1226 else:
1227 b = {k: v[:N] for k, v in self.buffer.items()}
1229 data = {"safe": False,
1230 "version": FORMAT_VERSION,
1231 "data": b,
1232 "Nstep": self.is_Nstep(),
1233 "cache": self.cache,
1234 "next_of": self.next_of}
1235 np.savez_compressed(file, **data)
1237 def _load_transitions_v1(self, data):
1238 d = unwrap(data["data"])
1239 N = data["Nstep"]
1241 if data["safe"]:
1242 if N:
1243 self.use_nstep = False
1244 self.add(**d)
1245 self.use_nstep = True
1246 else:
1247 self.add(**d)
1248 return None
1250 c = unwrap(data["cache"])
1251 n = unwrap(data["next_of"])
1253 cache_idx = np.sort([i for i in c.keys()])
1255 for k, v in d.items():
1256 _size = v.shape[0]
1257 break
1259 if N:
1260 self.use_nstep = False
1262 idx = 0
1263 for i in cache_idx:
1264 if idx < i:
1265 merge = {k: v[idx:i] for k,v in d.items()}
1266 if n is not None:
1267 merge = {**merge, **{f"next_{k}": d[k][idx+1:i+1] for k in n}}
1268 self.add(**merge)
1269 merge = {**{k: v[i] for k,v in d.items()}, **c[i]}
1270 self.add(**merge)
1271 self.on_episode_end()
1272 idx = i+1
1274 if idx < _size:
1275 if idx < _size - 1:
1276 merge = {k: v[idx:_size-1] for k,v in d.items()}
1277 if n is not None:
1278 merge = {**merge, **{f"next_{k}": d[k][idx+1:_size] for k in n}}
1279 self.add(**merge)
1281 merge = {k: v[_size-1] for k,v in d.items()}
1282 if n is not None:
1283 merge = {**merge, **{f"next_{k}": d[k][0] for k in n}}
1284 self.add(**merge)
1286 if N:
1287 self.use_nstep = True
1289 def load_transitions(self, file):
1290 r"""
1291 Load transitions from file
1293 Parameters
1294 ----------
1295 file : str or file-like object
1296 File to read data
1298 Raises
1299 ------
1300 ValueError
1301 When file format is wrong.
1303 Warnings
1304 --------
1305 In order to avoid security vulnerability,
1306 you **must not** load untrusted file, since this method is
1307 based on ``pickle``.
1308 """
1309 with np.load(file, allow_pickle=True) as data:
1310 version = data["version"]
1311 N = data["Nstep"]
1313 if (N and not self.is_Nstep()) or (not N and self.is_Nstep()):
1314 raise ValueError(f"Stored data and Buffer mismatch for Nstep")
1316 if version == 1:
1317 self._load_transitions_v1(data)
1318 else:
1319 raise ValueError(f"Unknown Format Version: {version}")
1321 def _encode_sample(self,idx):
1322 cdef sample = {}
1323 cdef next_idx
1324 cdef cache_idx
1325 cdef bool use_cache
1327 idx = np.array(idx,copy=False,ndmin=1)
1328 for name, b in self.buffer.items():
1329 sample[name] = b[idx]
1331 if self.has_next_of:
1332 next_idx = idx + 1
1333 next_idx[next_idx == self.get_buffer_size()] = 0
1334 cache_idx = (next_idx == self.get_next_index())
1335 use_cache = cache_idx.any()
1337 for name in self.next_of:
1338 sample[f"next_{name}"] = self.buffer[name][next_idx]
1339 if use_cache:
1340 # Cache for the latest "next_***" stored at `self.next_`
1341 sample[f"next_{name}"][cache_idx] = self.next_[name]
1343 cdef size_t i,_i
1344 cdef size_t N = idx.shape[0]
1345 if self.cache is not None:
1346 # Cache for episode ends stored at `self.cache`
1347 for _i in range(N):
1348 i = idx[_i]
1349 if i in self.cache:
1350 if self.has_next_of:
1351 for name in self.next_of:
1352 sample[f"next_{name}"][_i] = self.cache[i][f"next_{name}"]
1353 if self.compress_any:
1354 for name in self.stack_compress:
1355 sample[name][_i] = self.cache[i][name]
1357 return sample
1359 def sample(self,batch_size):
1360 r"""Sample the stored transitions randomly with specified size
1362 Parameters
1363 ----------
1364 batch_size : int
1365 sampled batch size
1367 Returns
1368 -------
1369 sample : dict of ndarray
1370 Sampled batch transitions, which might contains
1371 the same transition multiple times.
1373 Examples
1374 --------
1375 >>> rb = ReplayBuffer(1e+6, {"obs": {"shape": 3}})
1376 >>> rb.add(obs=[1,2,3])
1377 >>> rb.add(obs=[[1,2,3],[1,2,3]])
1378 >>> rb.sample(4)
1379 {'obs': array([[1., 2., 3.],
1380 [1., 2., 3.],
1381 [1., 2., 3.],
1382 [1., 2., 3.]], dtype=float32)}
1383 """
1384 cdef idx = np.random.randint(0,self.get_stored_size(),batch_size)
1385 return self._encode_sample(idx)
1387 cpdef void clear(self) except *:
1388 r"""Clear replay buffer.
1390 Set ``index`` and ``stored_size`` to ``0``.
1392 Example
1393 -------
1394 >>> rb = ReplayBuffer(5,{"done",{}})
1395 >>> rb.add(1)
1396 >>> rb.get_stored_size()
1397 1
1398 >>> rb.get_next_index()
1399 1
1400 >>> rb.clear()
1401 >>> rb.get_stored_size()
1402 0
1403 >>> rb.get_next_index()
1404 0
1405 """
1406 self.index.clear()
1407 self.episode_len = 0
1409 self.cache = {} if (self.has_next_of or self.compress_any) else None
1411 if self.use_nstep:
1412 self.nstep.clear()
1414 cpdef size_t get_stored_size(self):
1415 r"""Get stored size
1417 Returns
1418 -------
1419 size_t
1420 stored size
1421 """
1422 return self.index.get_stored_size()
1424 cpdef size_t get_buffer_size(self):
1425 r"""Get buffer size
1427 Returns
1428 -------
1429 size_t
1430 buffer size
1431 """
1432 return self.buffer_size
1434 cpdef size_t get_next_index(self):
1435 r"""Get the next index to store
1437 Returns
1438 -------
1439 size_t
1440 the next index to store
1441 """
1442 return self.index.get_next_index()
1444 cdef void add_cache(self):
1445 r"""Add last items into cache
1447 The last items for ``next_of`` and ``stack_compress`` optimization
1448 are moved to cache area.
1450 If ``self.cache is None`, do nothing.
1451 If ``self.stored_size == 0``, do nothing.
1452 """
1454 # If no cache configuration, do nothing
1455 if self.cache is None:
1456 return
1458 # If nothing are stored, do nothing
1459 if self.get_stored_size() == 0:
1460 return
1462 cdef size_t key_end = (self.get_next_index() or self.buffer_size)
1463 # Next index (without wraparounding): key_end in [1,...,self.buffer_size]
1465 cdef size_t key_min = 0
1466 cdef size_t max_cache = min(self.cache_size,self.episode_len)
1467 if key_end > max_cache:
1468 key_min = key_end - max_cache
1470 cdef size_t key = 0
1471 cdef size_t next_key = 0
1472 for key in range(key_min, key_end): # key_end is excluded
1473 self.add_cache_i(key, key_end)
1475 cdef void add_cache_i(self, size_t key, size_t key_end):
1476 # If key is already cached, don't do anything
1477 if key in self.cache:
1478 return
1480 cdef size_t next_key = key + 1
1481 cdef cache_key = {}
1483 if self.has_next_of:
1484 if next_key == key_end:
1485 for name, value in self.next_.items():
1486 cache_key[f"next_{name}"] = value.copy()
1487 else:
1488 for name in self.next_.keys():
1489 cache_key[f"next_{name}"] = self.buffer[name][next_key].copy()
1491 if self.compress_any:
1492 for name in self.stack_compress:
1493 cache_key[name] = self.buffer[name][key].copy()
1495 self.cache[key] = cache_key
1497 cpdef void on_episode_end(self) except *:
1498 r"""Call on episode end
1500 Finalize the current episode by moving remaining Nstep buffer transitions,
1501 evacuating overlapped data for memory compression features, and resetting
1502 episode length.
1504 Notes
1505 -----
1506 Calling this function at episode end is the user responsibility,
1507 since episode exploration can be terminated at certain length
1508 even though any ``done`` flags from environment is not set.
1509 """
1510 if self.use_nstep:
1511 self.use_nstep = False
1512 self.add(**self.nstep.on_episode_end())
1513 self.use_nstep = True
1515 self.add_cache()
1517 self.episode_len = 0
1519 cpdef size_t get_current_episode_len(self):
1520 r"""Get current episode length
1522 Returns
1523 -------
1524 size_t
1525 Current episode length
1526 """
1527 return self.episode_len
1529 cpdef bool is_Nstep(self):
1530 r"""Get whether use Nstep or not
1532 Returns
1533 -------
1534 bool
1535 Whether Nstep is used
1536 """
1537 return self.use_nstep
1539@cython.embedsignature(True)
1540cdef class PrioritizedReplayBuffer(ReplayBuffer):
1541 r"""Prioritized Replay Buffer class to store transitions with priorities.
1543 In this class, these transitions are sampled with corresponding to priorities.
1545 Notes
1546 -----
1547 In Prioritized Experience Replay (PER) [1]_, transitions are sampled
1548 with probabilities calculated from TD error. This class implements
1549 propotional variant where :math:`p_i = (|TD|_i + \varepsilon)^{\alpha}`.
1551 References
1552 ----------
1553 .. [1] T. Schaul et al, "Prioritized Experience Replay", ICLR (2016),
1554 https://arxiv.org/abs/1511.05952
1555 """
1556 cdef VectorFloat weights
1557 cdef VectorSize_t indexes
1558 cdef float alpha
1559 cdef CppPrioritizedSampler[float]* per
1560 cdef NstepBuffer priorities_nstep
1561 cdef bool check_for_update
1562 cdef bool [:] unchange_since_sample
1563 cdef vector[size_t] idx_vec
1564 cdef vector[float] ps_vec
1566 def __cinit__(self,size,env_dict=None,*,alpha=0.6,Nstep=None,eps=1e-4,
1567 check_for_update=False,**kwrags):
1568 self.alpha = alpha
1569 self.per = new CppPrioritizedSampler[float](size,alpha)
1570 self.per.set_eps(eps)
1571 self.weights = VectorFloat()
1572 self.indexes = VectorSize_t()
1574 if self.use_nstep:
1575 self.priorities_nstep = NstepBuffer({"priorities": {"dtype": np.single},
1576 "done": {}},
1577 {"size": Nstep["size"]})
1579 self.check_for_update = check_for_update
1580 if self.check_for_update:
1581 self.unchange_since_sample = np.ones(np.array(size,
1582 copy=False,
1583 dtype='int'),
1584 dtype='bool')
1586 self.idx_vec = vector[size_t]()
1587 self.ps_vec = vector[float]()
1589 def __init__(self,size,env_dict=None,*,alpha=0.6,Nstep=None,eps=1e-4,
1590 check_for_update=False,**kwargs):
1591 r"""Initialize ``PrioritizedReplayBuffer``
1593 Parameters
1594 ----------
1595 size : int
1596 Buffer size
1597 env_dict : dict of dict, optional
1598 Dictionary specifying environments. The keys of ``env_dict`` become
1599 environment names. The values of ``env_dict``, which are also ``dict``,
1600 defines ``"shape"`` (default ``1``) and ``"dtypes"``
1601 (fallback to ``default_dtype``)
1602 alpha : float, optional
1603 :math:`\alpha` the exponent of the priorities in stored whose
1604 default value is ``0.6``.
1605 eps : float, optional
1606 :math:`\epsilon` small positive constant to ensure error-less state
1607 will be sampled, whose default value is ``1e-4``.
1608 check_for_update : bool
1609 If the value is ``True`` (default value is ``False``),
1610 this buffer traces updated indices after the last calling of
1611 ``sample()`` method to avoid mis-updating priorities of already
1612 overwritten values. This feature is designed for multiprocess learning.
1614 See Also
1615 --------
1616 ReplayBuffer : Any optional parameters at ReplayBuffer are valid, too.
1619 Notes
1620 -----
1621 The minimum and summation over certain ranges of pre-calculated priorities
1622 :math:`(p_{i} + \epsilon )^{ \alpha }` are stored with segment tree, which
1623 enable fast sampling.
1624 """
1625 pass
1627 def add(self,*,priorities = None,**kwargs):
1628 r"""Add transition(s) into replay buffer.
1630 Multple sets of transitions can be added simultaneously.
1632 Parameters
1633 ----------
1634 priorities : array like or float, optional
1635 Priorities of each environment. When no priorities are passed,
1636 the maximum priorities until then are used.
1637 **kwargs : array like or float or int
1638 Transitions to be stored.
1640 Returns
1641 -------
1642 int or None
1643 The first index of stored position. If all transitions are stored
1644 into ``NstepBuffer`` and no transtions are stored into the main buffer,
1645 ``None`` is returned.
1647 Raises
1648 ------
1649 KeyError
1650 If any values defined at constructor are missing.
1652 Warnings
1653 --------
1654 All values must be passed by key-value style (keyword arguments).
1655 It is user responsibility that all the values have the same step-size.
1656 """
1657 cdef size_t N = self.size_check.step_size(kwargs)
1658 if priorities is not None:
1659 priorities = np.ravel(np.array(priorities,copy=False,
1660 ndmin=1,dtype=np.single))
1661 if N != priorities.shape[0]:
1662 raise ValueError("`priorities` shape is incompatible")
1664 if self.use_nstep:
1665 if priorities is None:
1666 priorities = np.full((N),self.get_max_priority(),dtype=np.single)
1668 priorities = self.priorities_nstep.add(priorities=priorities,
1669 done=np.array(kwargs["done"],
1670 copy=True))
1671 if priorities is not None:
1672 priorities = np.ravel(priorities["priorities"])
1673 N = priorities.shape[0]
1675 cdef maybe_index = super().add(**kwargs)
1676 if maybe_index is None:
1677 return None
1679 cdef size_t index = maybe_index
1680 cdef const float [:] ps
1682 if priorities is not None:
1683 ps = np.ravel(np.array(priorities,copy=False,ndmin=1,dtype=np.single))
1684 self.per.set_priorities(index,&ps[0],N,self.get_buffer_size())
1685 else:
1686 self.per.set_priorities(index,N,self.get_buffer_size())
1688 if self.check_for_update:
1689 if index+N <= self.buffer_size:
1690 self.unchange_since_sample[index:index+N] = False
1691 else:
1692 self.unchange_since_sample[index:] = False
1693 self.unchange_since_sample[:index+N-self.buffer_size] = False
1695 return index
1697 def sample(self,batch_size,beta = 0.4):
1698 r"""Sample the stored transitions.
1700 Transitions are sampled depending on correspoinding priorities
1701 with specified size
1703 Parameters
1704 ----------
1705 batch_size : int
1706 Sampled batch size
1707 beta : float, optional
1708 The exponent of weight for relaxation of importance
1709 sampling effect, whose default value is ``0.4``
1711 Returns
1712 -------
1713 dict of ndarray
1714 Sampled batch transitions which also includes
1715 ``"weights"`` and ``"indexes"``
1717 Notes
1718 -----
1719 When ``beta`` is ``0``, weights become uniform. When ``beta`` is ``1``,
1720 weight becomes usual importance sampling.
1721 The ``weights`` are also normalized by the weight for minimum priority
1722 (:math:`= w_{i}/\max_{j}(w_{j})`), which ensures the weights :math:`\leq` 1.
1723 """
1724 self.per.sample(batch_size,beta,
1725 self.weights.vec,self.indexes.vec,
1726 self.get_stored_size())
1727 cdef idx = self.indexes.as_numpy()
1728 samples = self._encode_sample(idx)
1729 samples['weights'] = self.weights.as_numpy()
1730 samples['indexes'] = idx
1732 if self.check_for_update:
1733 self.unchange_since_sample[:] = True
1735 return samples
1737 def update_priorities(self,indexes,priorities):
1738 r"""Update priorities
1740 Update priorities specified with indicies. If this
1741 ``PrioritizedReplayBuffer`` is constructed with
1742 ``check_for_update=True``, then ignore indices which updated
1743 values after the last calling of ``sample()`` method.
1745 Parameters
1746 ----------
1747 indexes : array_like
1748 Indexes to update priorities
1749 priorities : array_like
1750 Priorities to update
1752 Raises
1753 ------
1754 TypeError
1755 When ``indexes`` or ``priorities`` are ``None``
1756 """
1758 if priorities is None:
1759 raise TypeError("``properties`` must not be ``None``")
1761 cdef const size_t [:] idx = Csize(indexes)
1762 cdef const float [:] ps = Cfloat(priorities)
1764 if not self.check_for_update:
1765 self.per.update_priorities(&idx[0],&ps[0],idx.shape[0])
1766 return None
1768 self.idx_vec.clear()
1769 self.idx_vec.reserve(idx.shape[0])
1771 self.ps_vec.clear()
1772 self.ps_vec.reserve(ps.shape[0])
1774 if self.check_for_update:
1775 for _i in range(idx.shape[0]):
1776 if self.unchange_since_sample[idx[_i]]:
1777 self.idx_vec.push_back(idx[_i])
1778 self.ps_vec.push_back(ps[_i])
1780 cdef N = self.idx_vec.size()
1781 if N > 0:
1782 self.per.update_priorities(self.idx_vec.data(),self.ps_vec.data(),N)
1784 cpdef void clear(self) except *:
1785 r"""Clear replay buffer
1786 """
1787 super(PrioritizedReplayBuffer,self).clear()
1788 clear(self.per)
1789 if self.use_nstep:
1790 self.priorities_nstep.clear()
1792 cpdef float get_max_priority(self):
1793 r"""Get the max priority of stored priorities
1795 Returns
1796 -------
1797 max_priority : float
1798 Max priority of stored priorities
1799 """
1800 return self.per.get_max_priority()
1802 cpdef void on_episode_end(self) except *:
1803 r"""Call on episode end
1805 Finalize the current episode by moving remaining Nstep buffer transitions,
1806 evacuating overlapped data for memory compression features, and resetting
1807 episode length.
1809 Notes
1810 -----
1811 Calling this function at episode end is the user responsibility,
1812 since episode exploration can be terminated at certain length
1813 even though any ``done`` flags from environment is not set.
1814 """
1815 if self.use_nstep:
1816 self.use_nstep = False
1817 self.add(**self.nstep.on_episode_end(),
1818 priorities=self.priorities_nstep.on_episode_end()["priorities"])
1819 self.use_nstep = True
1821 self.add_cache()
1823 self.episode_len = 0
1826@cython.embedsignature(True)
1827cdef class ReverseReplayBuffer(ReplayBuffer):
1828 r"""Replay Buffer class for Reverse Experience Replay (RER)
1831 Notes
1832 -----
1833 In Reverse Experience Replay (RER) [1]_, samples equally strided
1834 transitions reversely.
1836 .. math::
1837 \begin{align}
1838 \text{sample-1}: &T_t , &T_{t-stride} , &\dots, &T_{t-batch\_size\times stride}\\
1839 \text{sample-2}: &T_{t-1}, &T_{t-stride-1}, &\dots, &T_{t-batch\_size\times stride-1}\\
1840 \dots&&&&
1841 \end{align}
1843 When the first index ``t-i`` is delayed from the latest index more
1844 than ``2*tride``, the first index will be reset to the latest one.
1847 References
1848 ----------
1849 .. [1] E. Rotinov, "Reverse Experience Replay" (2019),
1850 https://arxiv.org/abs/1910.08780
1851 """
1852 cdef size_t stride
1853 cdef size_t last_sampled_index
1854 def __cinit__(self, size, env_dict=None, *, stride = 300, **kwargs):
1855 self.stride = stride
1856 self.last_sampled_index = 0
1858 def __init__(self, size, env_dict=None,*, stride = 300, **kwargs):
1859 r"""
1860 Initialize ReverseReplayBuffer
1862 Parameters
1863 ----------
1864 size : int
1865 Buffer size
1866 next_of : str or array like of str, optional
1867 Value names whose next items share memory region.
1868 The ``"next_"`` prefixed items (eg. ``next_obs`` for ``obs``) are
1869 automatically added to ``env_dict`` without duplicated memory.
1870 stack_compress : str or array like of str, optional
1871 Value names whose duplicated stack dimension is compressed.
1872 The values must have stacked dimension at the last dimension.
1873 default_dtype : numpy.dtype, optional
1874 fallback dtype for not specified in ``env_dict``. default is
1875 ``numpy.single``
1876 Nstep : dict, optional
1877 ``Nstep["size"]`` is ``int`` specifying step size of Nstep reward.
1878 ``Nstep["rew"]`` is ``str`` or array like of ``str`` specifying
1879 Nstep reward to be summed. `Nstep["gamma"]` is float specifying
1880 discount factor, its default is ``0.99``. ``Nstep["next"]`` is ``str`` or
1881 list of ``str`` specifying next values to be moved.
1882 mmap_prefix : str, optional
1883 File name prefix to save buffer data using mmap. If ``None`` (default),
1884 save only on memory.
1885 stride : int, optional
1886 stride size. The default is ``300``.
1889 See Also
1890 --------
1891 ReplayBuffer : Any optional parameters at ReplayBuffer are valid, too.
1892 """
1893 super().__init__(size, env_dict, **kwargs)
1895 def sample(self, batch_size):
1896 r"""Sample the stored transitions reversely
1898 Parameters
1899 ----------
1900 batch_size : int
1901 sampled batch size
1903 Returns
1904 -------
1905 dict of ndarray
1906 Sampled batch transitions, which might contains
1907 the same transition multiple times.
1908 """
1909 cdef size_t nidx = self.get_next_index()
1910 cdef size_t ssize = self.get_stored_size()
1912 cdef size_t tmp_nidx = nidx
1913 if tmp_nidx <= self.last_sampled_index:
1914 tmp_nidx += ssize
1916 if (tmp_nidx - self.last_sampled_index) >= 2 * self.stride:
1917 self.last_sampled_index = (nidx or ssize) - 1
1918 else:
1919 self.last_sampled_index = (self.last_sampled_index or ssize) - 1
1921 cdef idx = np.zeros(batch_size, dtype = np.uint)
1923 # Ensure (idx >= 0).all()
1924 cdef size_t i
1925 cdef size_t tmp = self.last_sampled_index
1926 for i in range(batch_size):
1927 idx[i] = tmp
1928 while tmp < self.stride:
1929 tmp += ssize
1930 tmp -= self.stride
1932 return self._encode_sample(idx)
1935@cython.embedsignature(True)
1936cdef class MPReplayBuffer:
1937 r"""Multi-process support Replay Buffer class to store transitions and to sample them randomly.
1939 This class works on multi-process without manual locking of entire buffer.
1941 The transition can contain anything compatible with NumPy data
1942 type. User can specify by ``env_dict`` parameters at constructor
1943 freely.
1945 The possible standard transition contains observation (``obs``), action (``act``),
1946 reward (``rew``), the next observation (``next_obs``), and done (``done``).
1948 >>> env_dict = {"obs": {"shape": (4,4)},
1949 ... "act": {"shape": 3, "dtype": np.int16},
1950 ... "rew": {},
1951 ... "next_obs": {"shape": (4,4)},
1952 ... "done": {}}
1954 In this class, sampling is random sampling and the same transition
1955 can be chosen multiple times.
1957 See Also
1958 --------
1959 ReplayBuffer : Single process version
1961 Notes
1962 -----
1963 This class assumes single learner (``sample``) and multiple explorers (``add``)
1964 like Ape-X [1]_.
1966 References
1967 ----------
1968 .. [1] D. Horgan et al., "Distributed Prioritized Experience Replay", ICLR (2018),
1969 https://openreview.net/forum?id=H1Dy---0Z
1970 https://arxiv.org/abs/1803.00933
1971 """
1972 cdef buffer
1973 cdef size_t buffer_size
1974 cdef env_dict
1975 cdef ProcessSafeRingBufferIndex index
1976 cdef default_dtype
1977 cdef StepChecker size_check
1978 cdef explorer_ready
1979 cdef explorer_count
1980 cdef explorer_count_lock
1981 cdef learner_ready
1982 cdef backend
1984 def __init__(self, size, env_dict=None, *,
1985 default_dtype=None, logger=None,
1986 ctx=None, backend="sharedctypes",
1987 **kwargs):
1988 r"""Initialize ``MPReplayBuffer``
1990 Parameters
1991 ----------
1992 size : int
1993 Buffer size
1994 env_dict : dict of dict, optional
1995 Dictionary specifying environments. The keys of ``env_dict`` become
1996 environment names. The values of ``env_dict``, which are also ``dict``,
1997 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to
1998 ``default_dtype``)
1999 default_dtype : numpy.dtype, optional
2000 Fallback dtype for not specified in ``env_dict``.
2001 default is ``numpy.single``
2002 ctx : ForkContext, SpawnContext, or SyncManager, optional
2003 Context created by ``multiprocessing.get_context()`` or ``SyncManager``.
2004 If ``None`` (default), the default context is used.
2005 backend : {"sharedctypes", "SharedMemory"}
2006 Shared memory (shm) backend to map buffer. The default is
2007 ``"sharedctypes"``. ``"SharedMemory"`` is available only for Python 3.8+.
2008 """
2009 self.env_dict = env_dict.copy() if env_dict else {}
2010 ctx = ctx or mp.get_context()
2011 try_start(ctx)
2013 if not _has_SharedMemory and backend == "SharedMemory":
2014 backend = "sharedctypes"
2015 self.backend = backend
2017 cdef special_keys = []
2019 self.buffer_size = size
2020 self.index = ProcessSafeRingBufferIndex(self.buffer_size, ctx,
2021 self.backend)
2023 self.default_dtype = default_dtype or np.single
2025 # side effect: Add "add_shape" key into self.env_dict
2026 self.buffer = dict2buffer(self.buffer_size,self.env_dict,
2027 default_dtype = self.default_dtype,
2028 shared = self.backend,
2029 ctx = ctx)
2031 self.size_check = StepChecker(self.env_dict,special_keys)
2033 self.learner_ready = ctx.Event()
2034 self.learner_ready.clear()
2035 self.explorer_ready = ctx.Event()
2036 self.explorer_ready.set()
2037 self.explorer_count = RawValue(ctx, ctypes.c_size_t, 0, self.backend)
2038 self.explorer_count_lock = ctx.Lock()
2040 cdef void _lock_explorer(self) except *:
2041 self.explorer_ready.wait() # Wait permission
2042 self.learner_ready.clear() # Block learner
2043 with self.explorer_count_lock:
2044 self.explorer_count.value += 1
2046 cdef void _unlock_explorer(self) except *:
2047 with self.explorer_count_lock:
2048 self.explorer_count.value -= 1
2049 if self.explorer_count.value == 0:
2050 self.learner_ready.set()
2052 cdef void _lock_learner(self) except *:
2053 self.explorer_ready.clear() # New explorer cannot enter into critical section
2054 self.learner_ready.wait() # Wait until all explorer exit from critical section
2056 cdef void _unlock_learner(self) except *:
2057 self.explorer_ready.set() # Allow workers to enter into critical section
2059 def add(self,*,**kwargs):
2060 r"""Add transition(s) into replay buffer.
2062 Multple sets of transitions can be added simultaneously. This method
2063 can be called from multiple explorer processes without manual lock.
2065 Parameters
2066 ----------
2067 **kwargs : array like or float or int
2068 Transitions to be stored.
2070 Returns
2071 -------
2072 int
2073 The first index of stored position.
2075 Raises
2076 ------
2077 KeyError
2078 If any values defined at constructor are missing.
2080 Warnings
2081 --------
2082 All values must be passed by key-value style (keyword arguments).
2083 It is user responsibility that all the values have the same step-size.
2084 """
2085 cdef size_t N = self.size_check.step_size(kwargs)
2087 cdef size_t index = self.index.fetch_add(N)
2088 cdef size_t end = index + N
2089 cdef add_idx = np.arange(index,end)
2091 if end > self.buffer_size:
2092 add_idx[add_idx >= self.buffer_size] -= self.buffer_size
2095 self._lock_explorer()
2097 for name, b in self.buffer.items():
2098 b[add_idx] = np.reshape(np.array(kwargs[name],copy=False,ndmin=2),
2099 self.env_dict[name]["add_shape"])
2101 self._unlock_explorer()
2102 return index
2104 def get_all_transitions(self,shuffle: bool=False):
2105 r"""
2106 Get all transitions stored in replay buffer.
2108 Parameters
2109 ----------
2110 shuffle : bool, optional
2111 When ``True``, transitions are shuffled. The default value is ``False``.
2113 Returns
2114 -------
2115 transitions : dict of numpy.ndarray
2116 All transitions stored in this replay buffer.
2117 """
2118 idx = np.arange(self.get_stored_size())
2120 if shuffle:
2121 np.random.shuffle(idx)
2123 self._lock_learner()
2124 ret = self._encode_sample(idx)
2125 self._unlock_learner()
2127 return ret
2129 def _encode_sample(self,idx):
2130 cdef sample = {}
2132 idx = np.array(idx,copy=False,ndmin=1)
2134 for name, b in self.buffer.items():
2135 sample[name] = b[idx]
2137 return sample
2139 def sample(self,batch_size):
2140 r"""Sample the stored transitions randomly with specified size
2142 This method can be called from a single learner process.
2144 Parameters
2145 ----------
2146 batch_size : int
2147 sampled batch size
2149 Returns
2150 -------
2151 dict of ndarray
2152 Sampled batch transitions, which might contains
2153 the same transition multiple times.
2154 """
2155 cdef idx = np.random.randint(0,self.get_stored_size(),batch_size)
2157 self._lock_learner()
2158 ret = self._encode_sample(idx)
2159 self._unlock_learner()
2161 return ret
2163 cpdef void clear(self) except *:
2164 r"""Clear replay buffer.
2166 Set ``index`` and ``stored_size`` to ``0``.
2168 Example
2169 -------
2170 >>> rb = ReplayBuffer(5,{"done",{}})
2171 >>> rb.add(1)
2172 >>> rb.get_stored_size()
2173 1
2174 >>> rb.get_next_index()
2175 1
2176 >>> rb.clear()
2177 >>> rb.get_stored_size()
2178 0
2179 >>> rb.get_next_index()
2180 0
2181 """
2182 self.index.clear()
2184 cpdef size_t get_stored_size(self):
2185 r"""Get stored size
2187 Returns
2188 -------
2189 size_t
2190 Stored size
2191 """
2192 return self.index.get_stored_size()
2194 cpdef size_t get_buffer_size(self):
2195 r"""Get buffer size
2197 Returns
2198 -------
2199 size_t
2200 Buffer size
2201 """
2202 return self.buffer_size
2204 cpdef size_t get_next_index(self):
2205 r"""Get the next index to store
2207 Returns
2208 -------
2209 size_t
2210 Next index to store
2211 """
2212 return self.index.get_next_index()
2214 cpdef void on_episode_end(self) except *:
2215 r"""Call on episode end
2217 Notes
2218 -----
2219 Calling this function at episode end is the user responsibility,
2220 since episode exploration can be terminated at certain length
2221 even though any ``done`` flags from environment is not set.
2223 Warnings
2224 --------
2225 Although nothing happens for ``MPReplayBuffer``, it is better to call this
2226 because some functionalities might be added in the future version.
2227 """
2228 pass
2230 cpdef bool is_Nstep(self):
2231 r"""Get whether use Nstep or not
2233 Since ``MPReplayBuffer`` doesn't supports Nstep feature,
2234 return value is always ``False``.
2236 Returns
2237 -------
2238 bool
2239 ``False``
2240 """
2241 return False
2244cdef class ThreadSafePrioritizedSampler:
2245 cdef size_t size
2246 cdef float alpha
2247 cdef float eps
2248 cdef backend
2249 cdef max_p
2250 cdef sum
2251 cdef sum_a#nychanged
2252 cdef min
2253 cdef min_a#nychanged
2254 cdef CppThreadSafePrioritizedSampler[float]* per
2256 def __init__(self,size,alpha,eps,max_p=None,
2257 sum=None,sum_a=None,
2258 min=None,min_a=None,
2259 ctx = None,
2260 backend = "sharedctypes"):
2261 ctx = ctx or mp.get_context()
2262 self.size = size
2263 self.alpha = alpha
2264 self.eps = eps
2265 self.backend = backend
2267 self.max_p = max_p or RawArray(ctx, ctypes.c_float,1,self.backend)
2268 cdef float [:] view_max_p = self.max_p.ndarray
2270 cdef size_t pow2size = 1
2271 while pow2size < size:
2272 pow2size *= 2
2274 self.sum = sum or RawArray(ctx, ctypes.c_float,2*pow2size-1, self.backend)
2275 self.sum_a = sum_a or RawArray(ctx, ctypes.c_bool ,1 , self.backend)
2276 self.min = min or RawArray(ctx, ctypes.c_float,2*pow2size-1, self.backend)
2277 self.min_a = min_a or RawArray(ctx, ctypes.c_bool ,1 , self.backend)
2279 cdef float [:] view_sum = self.sum.ndarray
2280 cdef bool [:] view_sum_a = self.sum_a.ndarray
2281 cdef float [:] view_min = self.min.ndarray
2282 cdef bool [:] view_min_a = self.min_a.ndarray
2284 cdef bool init = ((max_p is None) and
2285 (sum is None) and
2286 (sum_a is None) and
2287 (min is None) and
2288 (min_a is None))
2290 self.per = new CppThreadSafePrioritizedSampler[float](size,alpha,
2291 &view_max_p[0],
2292 &view_sum[0],
2293 &view_sum_a[0],
2294 &view_min[0],
2295 &view_min_a[0],
2296 init,
2297 eps)
2299 cdef CppThreadSafePrioritizedSampler[float]* ptr(self):
2300 return self.per
2302 def __reduce__(self):
2303 return (ThreadSafePrioritizedSampler,
2304 (self.size, self.alpha, self.eps, self.max_p,
2305 self.sum, self.sum_a, self.min, self.min_a,
2306 None, self.backend))
2309@cython.embedsignature(True)
2310cdef class MPPrioritizedReplayBuffer(MPReplayBuffer):
2311 r"""Multi-process support Prioritized Replay Buffer class to store transitions with priorities.
2313 This class can work on multi-process without manual lock.
2315 In this class, these transitions are sampled with corresponding to priorities.
2317 Notes
2318 -----
2319 This class assumes single learner (``sample``, ``update_priorities``) and
2320 multiple explorers (``add``) like Ape-X [1]_.
2322 References
2323 ----------
2324 .. [1] D. Horgan et al., "Distributed Prioritized Experience Replay", ICLR (2018),
2325 https://openreview.net/forum?id=H1Dy---0Z
2326 https://arxiv.org/abs/1803.00933
2327 """
2328 cdef VectorFloat weights
2329 cdef VectorSize_t indexes
2330 cdef ThreadSafePrioritizedSampler per
2331 cdef unchange_since_sample
2332 cdef terminate
2333 cdef explorer_per_count
2334 cdef explorer_per_count_lock
2335 cdef learner_per_ready
2336 cdef explorer_per_ready
2337 cdef vector[size_t] idx_vec
2338 cdef vector[float] ps_vec
2340 def __init__(self,size,env_dict=None,*,alpha=0.6,eps=1e-4,ctx=None,**kwargs):
2341 r"""Initialize ``MPPrioritizedReplayBuffer``
2343 Parameters
2344 ----------
2345 size : int
2346 Buffer size
2347 env_dict : dict of dict, optional
2348 Dictionary specifying environments. The keys of ``env_dict`` become
2349 environment names. The values of ``env_dict``, which are also ``dict``,
2350 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to
2351 ``default_dtype``)
2352 alpha : float, optional
2353 :math:`\alpha` the exponent of the priorities in stored whose
2354 default value is ``0.6``
2355 eps : float, optional
2356 :math:`\epsilon` small positive constant to ensure error-less state
2357 will be sampled, whose default value is ``1e-4``.
2359 See Also
2360 --------
2361 MPReplayBuffer : Any optional parameters at ``MPReplayBuffer`` are valid, too.
2362 PrioritizedReplayBuffer : Single process version Prioritized Experience Replay
2365 Notes
2366 -----
2367 The minimum and summation over certain ranges of pre-calculated priorities
2368 :math:`(p_{i} + \epsilon )^{ \alpha }` are stored with segment tree, which
2369 enable fast sampling.
2370 """
2371 ctx = ctx or mp.get_context()
2372 super().__init__(size,env_dict,ctx=ctx,**kwargs)
2374 self.per = ThreadSafePrioritizedSampler(size,alpha,eps,
2375 ctx=ctx, backend=self.backend)
2377 self.weights = VectorFloat()
2378 self.indexes = VectorSize_t()
2380 self.unchange_since_sample = RawArray(ctx, ctypes.c_bool, size, self.backend)
2381 self.unchange_since_sample[:] = True
2383 self.terminate = RawValue(ctx, ctypes.c_bool,0, self.backend)
2384 self.terminate.value = False
2386 self.learner_per_ready = ctx.Event()
2387 self.learner_per_ready.clear()
2388 self.explorer_per_ready = ctx.Event()
2389 self.explorer_per_ready.set()
2390 self.explorer_per_count = RawValue(ctx, ctypes.c_size_t, 0, self.backend)
2391 self.explorer_per_count_lock = ctx.Lock()
2393 self.idx_vec = vector[size_t]()
2394 self.ps_vec = vector[float]()
2397 cdef void _lock_explorer_per(self) except *:
2398 self.explorer_per_ready.wait() # Wait permission
2399 self.learner_per_ready.clear() # Block learner
2400 with self.explorer_per_count_lock:
2401 self.explorer_per_count.value += 1
2403 cdef void _unlock_explorer_per(self) except *:
2404 with self.explorer_per_count_lock:
2405 self.explorer_per_count.value -= 1
2406 if self.explorer_per_count.value == 0:
2407 self.learner_per_ready.set()
2409 cdef void _lock_learner_per(self) except *:
2410 self.explorer_per_ready.clear()
2411 self.learner_per_ready.wait()
2413 cdef void _unlock_learner_per(self) except *:
2414 self.explorer_per_ready.set()
2416 cdef void _lock_learner_unlock_learner_per(self) except *:
2417 self.explorer_ready.clear()
2418 self.explorer_per_ready.set()
2419 self.learner_ready.wait()
2421 def add(self,*,priorities = None,**kwargs):
2422 r"""Add transition(s) into replay buffer.
2424 Multple sets of transitions can be added simultaneously. This method can be
2425 called from multiple explorer processes without manual lock.
2427 Parameters
2428 ----------
2429 priorities : array like or float, optional
2430 Priorities of each environment. When no priorities are passed,
2431 the maximum priorities until then are used.
2432 **kwargs : array like or float or int
2433 Transitions to be stored.
2435 Returns
2436 -------
2437 int
2438 The first index of stored position.
2440 Raises
2441 ------
2442 KeyError
2443 If any values defined at constructor are missing.
2445 Warnings
2446 --------
2447 All values must be passed by key-value style (keyword arguments).
2448 It is user responsibility that all the values have the same step-size.
2449 """
2450 cdef size_t N = self.size_check.step_size(kwargs)
2451 cdef const float [:] ps
2453 if priorities is not None:
2454 priorities = np.ravel(np.array(priorities,copy=False,
2455 ndmin=1,dtype=np.single))
2456 if N != priorities.shape[0]:
2457 raise ValueError("`priorities` shape is incompatible")
2459 cdef size_t index = self.index.fetch_add(N)
2460 cdef size_t end = index + N
2461 cdef add_idx = np.arange(index,end)
2463 if end > self.buffer_size:
2464 add_idx[add_idx >= self.buffer_size] -= self.buffer_size
2467 self._lock_explorer_per()
2469 if priorities is not None:
2470 ps = np.ravel(np.array(priorities,copy=False,ndmin=1,dtype=np.single))
2471 self.per.ptr().set_priorities(index,&ps[0],N,self.get_buffer_size())
2472 else:
2473 self.per.ptr().set_priorities(index,N,self.get_buffer_size())
2475 if index+N <= self.buffer_size:
2476 self.unchange_since_sample[index:index+N] = False
2477 else:
2478 self.unchange_since_sample[index:] = False
2479 self.unchange_since_sample[:index+N-self.buffer_size] = False
2481 self._lock_explorer()
2482 self._unlock_explorer_per()
2484 for name, b in self.buffer.items():
2485 b[add_idx] = np.reshape(np.array(kwargs[name],copy=False,ndmin=2),
2486 self.env_dict[name]["add_shape"])
2488 self._unlock_explorer()
2489 return index
2491 def sample(self,batch_size,beta = 0.4):
2492 r"""Sample the stored transitions.
2494 Transitions are sampled depending on correspoinding priorities
2495 with specified size. This method can be called from single learner process.
2497 Parameters
2498 ----------
2499 batch_size : int
2500 Sampled batch size
2501 beta : float, optional
2502 The exponent of weight for relaxation of importance
2503 sampling effect, whose default value is ``0.4``
2505 Returns
2506 -------
2507 dict of ndarray
2508 Sampled batch transitions which also includes
2509 ``"weights"`` and ``"indexes"``
2511 Notes
2512 -----
2513 When ``beta`` is ``0``, weights become uniform. When ``beta`` is ``1``,
2514 weight becomes usual importance sampling.
2515 The ``weights`` are also normalized by the weight for minimum priority
2516 (:math:`= w_{i}/\max_{j}(w_{j})`), which ensure the weights :math:`\leq` 1.
2517 """
2518 self._lock_learner_per()
2519 self.per.ptr().sample(batch_size,beta,
2520 self.weights.vec,self.indexes.vec,
2521 self.get_stored_size())
2522 cdef idx = self.indexes.as_numpy()
2524 self._lock_learner_unlock_learner_per()
2526 samples = self._encode_sample(idx)
2527 self.unchange_since_sample[:] = True
2528 self._unlock_learner()
2530 samples['weights'] = self.weights.as_numpy()
2531 samples['indexes'] = idx
2533 return samples
2535 def update_priorities(self,indexes,priorities):
2536 r"""Update priorities
2538 Update priorities specified with indicies. Ignores indices
2539 which updated values after the last calling of ``sample()``
2540 method. This method can be called from single learner process.
2542 Parameters
2543 ----------
2544 indexes : array_like
2545 Indexes to update priorities
2546 priorities : array_like
2547 Priorities to update
2549 Raises
2550 ------
2551 TypeError
2552 When ``indexes`` or ``priorities`` are ``None``
2553 """
2555 if priorities is None:
2556 raise TypeError("`properties` must not be `None`")
2558 cdef const size_t [:] idx = Csize(indexes)
2559 cdef const float [:] ps = Cfloat(priorities)
2561 self.idx_vec.clear()
2562 self.idx_vec.reserve(idx.shape[0])
2564 self.ps_vec.clear()
2565 self.ps_vec.reserve(ps.shape[0])
2567 self._lock_learner_per()
2568 cdef size_t stored_size = self.get_stored_size()
2569 for _i in range(idx.shape[0]):
2570 if idx[_i] < stored_size and self.unchange_since_sample[idx[_i]]:
2571 self.idx_vec.push_back(idx[_i])
2572 self.ps_vec.push_back(ps[_i])
2574 cdef N = self.idx_vec.size()
2575 if N > 0:
2576 self.per.ptr().update_priorities(self.idx_vec.data(),self.ps_vec.data(),N)
2577 self._unlock_learner_per()
2579 cpdef void clear(self) except *:
2580 r"""Clear replay buffer
2581 """
2582 super(MPPrioritizedReplayBuffer,self).clear()
2583 clear(self.per.ptr())
2585 cpdef float get_max_priority(self):
2586 r"""Get the max priority of stored priorities
2588 Returns
2589 -------
2590 max_priority : float
2591 The max priority of stored priorities
2592 """
2593 return self.per.ptr().get_max_priority()
2595 cpdef void on_episode_end(self) except *:
2596 r"""Call on episode end
2598 Notes
2599 -----
2600 Calling this function at episode end is the user responsibility,
2601 since episode exploration can be terminated at certain length
2602 even though any ``done`` flags from environment is not set.
2603 """
2604 pass
2607@cython.embedsignature(True)
2608def create_buffer(size,env_dict=None,*,prioritized = False,**kwargs):
2609 r"""Create specified version of replay buffer
2611 Parameters
2612 ----------
2613 size : int
2614 Buffer size
2615 env_dict : dict of dict, optional
2616 Dictionary specifying environments. The keys of ``env_dict`` become
2617 environment names. The values of ``env_dict``, which are also ``dict``,
2618 defines ``"shape"`` (default ``1``) and ``"dtypes"`` (fallback to
2619 ``default_dtype``)
2620 prioritized : bool, optional
2621 Whether create prioritized version replay buffer. The default is ``False``.
2623 Returns
2624 -------
2625 ReplayBuffer or PrioritizedReplayBuffer
2626 Replay Buffer
2628 Raises
2629 ------
2630 NotImplementedError
2631 If you specified not implemented version replay buffer
2633 Notes
2634 -----
2635 Any other keyword arguments are passed to replay buffer constructor.
2637 See Also
2638 --------
2639 ReplayBuffer, PrioritizedReplayBuffer
2640 """
2641 per = "Prioritized" if prioritized else ""
2643 buffer_name = f"{per}ReplayBuffer"
2645 cls={"ReplayBuffer": ReplayBuffer,
2646 "PrioritizedReplayBuffer": PrioritizedReplayBuffer}
2648 buffer = cls.get(f"{buffer_name}",None)
2650 if buffer:
2651 return buffer(size,env_dict,**kwargs)
2653 raise NotImplementedError(f"{buffer_name} is not Implemented")
2656@cython.embedsignature(True)
2657def train(buffer: ReplayBuffer,
2658 env,
2659 get_action: Callable,
2660 update_policy: Callable,*,
2661 max_steps: int=int(1e6),
2662 max_episodes: Optional[int] = None,
2663 batch_size: int = 64,
2664 n_warmups: int = 0,
2665 after_step: Optional[Callable] = None,
2666 done_check: Optional[Callable] = None,
2667 obs_update: Optional[Callable] = None,
2668 rew_sum: Optional[Callable[[float, Any], float]] = None,
2669 episode_callback: Optional[Callable[[int,int,float],Any]] = None,
2670 logger = None):
2671 r"""
2672 Train RL policy (model)
2674 Parameters
2675 ----------
2676 buffer: ReplayBuffer
2677 Buffer to be used for training
2678 env: gym.Enviroment compatible
2679 Environment to learn
2680 get_action: Callable
2681 Callable taking ``obs`` and returning ``action``
2682 update_policy: Callable
2683 Callable taking ``sample``, ``step``, and ``episode``, updating policy,
2684 and returning :math:`|TD|`.
2685 max_steps: int, optional
2686 Maximum steps to learn. The default value is ``1000000``
2687 max_episodes: int, optional
2688 Maximum episodes to learn. The defaul value is ``None``
2689 n_warmups: int, optional
2690 Warmup steps before sampling. The default value is ``0`` (No warmup)
2691 after_step: Callable, optional
2692 Callable converting from ``obs``, returns of ``env.step(action)``,
2693 ``step``, and ``episode`` to ``dict`` of a transition for
2694 ``ReplayBuffer.add``.
2695 This function can also be used for step summary callback.
2696 done_check: Callable, optional
2697 Callable checking done
2698 obs_update: Callable, optional
2699 Callable updating obs
2700 rew_sum: Callable[[float, Dict], float], optional
2701 Callable summarizing episode reward
2702 episode_callback: Callable[[int, int, float], Any], optional
2703 Callable for episode summarization
2704 logger: logging.Logger, optional
2705 Custom Logger
2707 Raises
2708 ------
2709 ValueError:
2710 When ``max_step`` is larger than ``size_t`` limit
2712 Warnings
2713 --------
2714 ``cpprb.train`` is still beta release. API can be changed.
2715 """
2716 warnings.warn("`cpprb.train` is still beta release. API can be changed.")
2718 logger = logger or default_logger()
2720 cdef size_t size_t_limit = -1
2721 if max_steps >= int(size_t_limit):
2722 raise ValueError(f"max_steps ({max_steps}) is too big. " +
2723 f"max_steps < {size_t_limit}")
2725 cdef bool use_per = isinstance(buffer,PrioritizedReplayBuffer)
2726 cdef bool has_after_step = after_step
2727 cdef bool has_check = done_check
2728 cdef bool has_obs_update = obs_update
2729 cdef bool has_rew_sum = rew_sum
2730 cdef bool has_episode_callback = episode_callback
2732 cdef size_t _max_steps = max(max_steps,0)
2733 cdef size_t _max_episodes = min(max(max_episodes or size_t_limit, 0),size_t_limit)
2734 cdef size_t _n_warmup = min(max(0,n_warmups),size_t_limit)
2736 cdef size_t step = 0
2737 cdef size_t episode = 0
2738 cdef size_t episode_step = 0
2739 cdef float episode_reward = 0.0
2740 cdef bool is_warmup = True
2742 obs = env.reset()
2743 cdef double episode_start_time = time.perf_counter()
2744 cdef double episode_end_time = 0.0
2745 for step in range(_max_steps):
2746 is_warmup = (step < _n_warmup)
2748 # Get action
2749 action = get_action(obs,step,episode,is_warmup)
2751 # Step environment
2752 if has_after_step:
2753 transition = after_step(obs,action,env.step(action),step,episode)
2754 else:
2755 next_obs, reward, done, _ = env.step(action)
2756 transition = {"obs": obs,
2757 "act": action,
2758 "rew": reward,
2759 "next_obs": next_obs,
2760 "done": done}
2762 # Add to buffer
2763 buffer.add(**transition)
2765 # For Nstep, ReplayBuffer can be empty after `add(**transition)` method
2766 if (buffer.get_stored_size() > 0) and (not is_warmup):
2767 # Sample
2768 sample = buffer.sample(batch_size)
2769 absTD = update_policy(sample,step,episode)
2771 if use_per:
2772 buffer.update_priorities(sample["indexes"],absTD)
2774 # Summarize reward
2775 episode_reward = (rew_sum(episode_reward,transition) if has_rew_sum
2776 else transition["rew"])
2778 # Prepare the next step
2779 if done_check(transition) if has_check else transition["done"]:
2780 episode_end_time = time.perf_counter()
2782 # step/episode_step are index.
2783 # Total Steps/Episode Steps are counts.
2784 SPS = (episode_step+1) / max(episode_end_time-episode_start_time,1e-9)
2785 logger.info(f"{episode: 6}th Episode: " +
2786 f"{episode_step+1: 5} Steps " +
2787 f"({step+1: 7} Total Steps), " +
2788 f"{episode_reward: =+7.2f} Reward, " +
2789 f"{SPS: =+5.2f} Steps/s")
2791 # Summary
2792 if has_episode_callback:
2793 episode_callback(episode,episode_step,episode_reward)
2795 # Reset
2796 obs = env.reset()
2797 buffer.on_episode_end()
2798 episode_reward = 0.0
2799 episode_step = 0
2801 # Update episode count
2802 episode += 1
2803 if episode >= _max_episodes:
2804 break
2806 episode_start_time = time.perf_counter()
2807 else:
2808 obs = obs_update(transition) if has_obs_update else transition["next_obs"]
2809 episode_step += 1